package com.shockweb.register;


import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import com.shockweb.bridge.DataAgreement;
import com.shockweb.bridge.HostInfo;
import com.shockweb.bridge.OperationDefine;
import com.shockweb.bridge.ServerInfo;
import com.shockweb.common.log.LogManager;
import com.shockweb.register.data.ServiceRoot;
import com.shockweb.utils.Convert;
import com.shockweb.common.International;
import com.shockweb.common.context.ContextManager;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

/**
 * 注册服务器接收请求的handler
 * 
 * @author 彭明华
 * 2018年1月3日 创建
 */
public class RegisterServerNettyHandler extends ChannelInboundHandlerAdapter {

    
	/**
	 * 被调用次数
	 */
    private static long call = 0;
    
    /**
     * 被调用次数
     * @return
     */
    public static long getCall() {
        return call;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        LogManager.infoLog(this.getClass(),ctx.channel().remoteAddress().toString() + "连接" + ctx.channel().localAddress().toString());
    }
    
    /**
     * 服务端接收客户端发来的数据时触发
     */
    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = null;
        //判断数据类型
        if (msg instanceof ByteBuf) {
            buf = (ByteBuf) msg;
        } else {
        	LogManager.errorLog(this.getClass(), new RegisterServerException("接收到非法数据,非ByteBuf数据"));
            return;
        }
        call++;
        try {
            byte[] data = new byte[buf.readableBytes()];
            buf.readBytes(data);
            byte operation = DataAgreement.resolutionOperation(data);
            if (operation == OperationDefine.ALIVE.value()) {
            	return;
            }
            String uuid = DataAgreement.resolutionUUID(data);
            ContextManager.setUuid(uuid);
            int offset = DataAgreement.BYTE_LENGTH + DataAgreement.UUID_LENGTH;
            try{
	            if (operation == OperationDefine.REQ_PUT_HOSTS.value()) {
	            	HostInfo info = (HostInfo)Convert.convertToObject(data, offset, data.length-offset, ServerInfo.class);
	            	ServiceRoot.getServiceRoot().putHostInfo(info);
	            	SyncThread.getInstance().addClusterHost(info);
	            	sendData(ctx.channel(),OperationDefine.RES_SUCCESS,uuid,null);
	            } else if (operation == OperationDefine.REQ_PUT_SERVICES.value()) {
	            	ServerInfo info = (ServerInfo)Convert.convertToObject(data, offset, data.length-offset, ServerInfo.class);
	            	ServiceRoot.getServiceRoot().putServerInfo(info);
	            	SyncThread.getInstance().addClusterServerInfo(info);
	            	info.setCalled(System.currentTimeMillis());
	            	sendData(ctx.channel(),OperationDefine.RES_RESULT,uuid,Convert.convertToBytes(info));
	            } else if (operation == OperationDefine.REQ_SYNC_HOSTS.value()) {
					List<HostInfo> infos = (List<HostInfo>)Convert.convertToObject(data, offset, data.length-offset, ArrayList.class);
	            	for(HostInfo info:infos){
	            		ServiceRoot.getServiceRoot().putHostInfo(info);
	            	}
	            	sendData(ctx.channel(),OperationDefine.RES_SUCCESS,uuid,null);
	            } else if (operation == OperationDefine.REQ_SYNC_SERVICES.value()) {
					List<ServerInfo> infos = (List<ServerInfo>)Convert.convertToObject(data, offset, data.length-offset, ArrayList.class);
	            	for(ServerInfo info:infos){
	            		ServiceRoot.getServiceRoot().putServerInfo(info);
	            	}
	            	sendData(ctx.channel(),OperationDefine.RES_SUCCESS,uuid,null);
	            } else if (operation == OperationDefine.REQ_CLEAR_SERVICES.value()) {
	            	ServiceRoot.getServiceRoot().clear();
	            	sendData(ctx.channel(),OperationDefine.RES_SUCCESS,uuid,null);
	            } else if (operation == OperationDefine.REQ_QUERY_SERVICES.value()) {
	            	byte[] rtn = Convert.convertToBytes(ServiceRoot.getServiceRoot().getServiceSpaces());
	            	sendData(ctx.channel(),OperationDefine.RES_ALL_SERVICES,uuid,rtn);
	            } else if (operation == OperationDefine.REQ_STOP.value()) {
	            	RegisterServer.stop();
	            	sendData(ctx.channel(),OperationDefine.RES_SUCCESS,uuid,null);
	            }else{
	            	LogManager.errorLog(this.getClass(), new RegisterServerException("接收到非法数据,非ByteBuf数据,operation=" + operation));
	            }
            } catch (Exception e) {
            	sendException(ctx.channel(),OperationDefine.RES_ERROR,uuid,e);
            	LogManager.errorLog(this.getClass(), new RegisterServerException("接收到非法数据,非ByteBuf数据",e));
            }
        } catch (Exception e) {
        	sendException(ctx.channel(),OperationDefine.RES_ERROR,UUID.randomUUID().toString(),e);
        	LogManager.errorLog(this.getClass(), new RegisterServerException("接收到非法数据,读取operation或uuid出错",e));
        } finally {
        	if(buf != null){
        		buf.release();//手动释放缓冲区
        	}
        }
    }
    
    /**
     * 将错误信息写入通道
     * @param channel
     * @param operation
     * @param data
     * @throws UnsupportedEncodingException 
     */
    public static void sendException(Channel channel, OperationDefine operation,String uuid,Exception e) throws UnsupportedEncodingException{
    	String message = LogManager.exceptionToString(e);
    	if(message!=null){
    		sendData(channel,operation,uuid,message.getBytes(International.CHARSET));
    	}else{
    		sendData(channel,operation,uuid,null);
    	}
    }
    
    /**
     * 将结果数据写入通道
     * @param channel
     * @param operation
     * @param data
     * @throws UnsupportedEncodingException 
     */
    public static void sendData(Channel channel, OperationDefine operation,String uuid,byte[] data) throws UnsupportedEncodingException{
        if (channel != null && channel.isOpen() && channel.isActive() && channel.isWritable()) {
            ByteBufAllocator alloc = channel.alloc();
        	int len = DataAgreement.BYTE_LENGTH + DataAgreement.UUID_LENGTH;
            if (data != null) {
                len = len + data.length;
            }
            ByteBuf buf = alloc.buffer(len);
            buf.writeByte(operation.value());
            buf.writeBytes(uuid.getBytes(International.CHARSET));
            if(data!=null){
            	buf.writeBytes(data);
            }
        	channel.writeAndFlush(buf).addListener(
            	new ChannelFutureListener() {
                	public void operationComplete(ChannelFuture f)throws Exception {
                        if (!f.isSuccess()) {
                            LogManager.errorLog(RegisterServerNettyHandler.class, "Message sending failed",f.cause());
                        }
                    }
                });

        }
    }
    
    /**
     * @see ChannelInboundHandlerAdapter#exceptionCaught(ChannelHandlerContext, Throwable)
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.disconnect();// 服务端发送异常时关闭客户端
    }

    /**
     * @see ChannelInboundHandlerAdapter#channelReadComplete(ChannelHandlerContext)
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
   		ctx.flush();
    }

    /**
     * 当客户端主动断开服务端的链接后，这个通道就是不活跃的。 也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.close();
    }
    
    /**
     * 客户端心跳检测
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            if (((IdleStateEvent) evt).state() == IdleState.READER_IDLE) {
                ctx.close();
            }
        }
    }


    
    
}
